package com.atguigu.api

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._


/**
 * @description: xxx
 * @time: 2020/6/21 13:10
 * @author: baojinlong
 **/
object JdbcSinkTest {
  def main(args: Array[String]): Unit = {
    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    // 设置并行度
    environment.setParallelism(1)
    val inputStreamFromFile: DataStream[String] = environment.readTextFile("E:/qj_codes/big-data/FlinkTutorial/src/main/resources/sensor.data")

    // 基本转换操作
    val dataStream: DataStream[SensorReading] = inputStreamFromFile
      .map(data => {
        val dataArray: Array[String] = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })
    dataStream.addSink(new MyJdbcSink())
    environment.execute("sink simple test job")
  }
}

class MyJdbcSink() extends RichSinkFunction[SensorReading] {
  // 定义预定义sql连接,以及预编译语句
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306", "root", "root")
    insertStmt = conn.prepareStatement("insert into sensor(sensor,temperature) values(?,?)")
    updateStmt = conn.prepareStatement("update sensor set temperature = ? where sensor = ?")
  }

  override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
    // 执行更新语句
    updateStmt.setDouble(1, value.temperature)
    updateStmt.setString(2, value.id)
    updateStmt.execute()
    // 如果没有更新数据则执行插入
    if (updateStmt.getUpdateCount == 0) {
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()

    }
  }

  /**
   * 首尾工作,关闭连接等
   */
  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}
